{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession \n",
    "spark=SparkSession.builder.appName('rs').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_ratings=spark.read.csv('ml-latest-small/ratings.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- userId: integer (nullable = true)\n",
      " |-- movieId: integer (nullable = true)\n",
      " |-- rating: double (nullable = true)\n",
      " |-- timestamp: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_ratings.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_movie=spark.read.csv('ml-latest-small/movies.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- movieId: integer (nullable = true)\n",
      " |-- title: string (nullable = true)\n",
      " |-- genres: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_movie.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_ratings.createOrReplaceTempView(\"ratings\")   # 构建评分视图\n",
    "df_movie.createOrReplaceTempView(\"movies\")      # 构建电影视图"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_details = spark.sql(\"SELECT ratings.userId , ratings.movieId , movies.title , movies.genres , ratings.rating  FROM ratings   \\\n",
    "                        INNER JOIN movies ON ratings.movieId = movies.movieId \")    # 两表关联，获取具体的信息"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- userId: integer (nullable = true)\n",
      " |-- movieId: integer (nullable = true)\n",
      " |-- title: string (nullable = true)\n",
      " |-- genres: string (nullable = true)\n",
      " |-- rating: double (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_details.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(100836, 5)\n"
     ]
    }
   ],
   "source": [
    "print((df_details.count(),len(df_details.columns))) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-----+\n",
      "|userId|count|\n",
      "+------+-----+\n",
      "|414   |2698 |\n",
      "|599   |2478 |\n",
      "|474   |2108 |\n",
      "|448   |1864 |\n",
      "|274   |1346 |\n",
      "|610   |1302 |\n",
      "|68    |1260 |\n",
      "|380   |1218 |\n",
      "|606   |1115 |\n",
      "|288   |1055 |\n",
      "+------+-----+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_details.groupBy('userId').count().orderBy('count',ascending=False).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "100836"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_details.na.drop().count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+\n",
      "|summary|            rating|\n",
      "+-------+------------------+\n",
      "|  count|            100836|\n",
      "|   mean| 3.501556983616962|\n",
      "| stddev|1.0425292390606342|\n",
      "|    min|               0.5|\n",
      "|    25%|               3.0|\n",
      "|    50%|               3.5|\n",
      "|    75%|               4.0|\n",
      "|    max|               5.0|\n",
      "+-------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_details.summary().select('summary','rating').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_details.createOrReplaceTempView(\"df_details\") "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+-----+------+------+\n",
      "|userId|movieId|title|genres|rating|\n",
      "+------+-------+-----+------+------+\n",
      "+------+-------+-----+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark.sql(\" select * from df_details where title is null \").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import StringIndexer,IndexToString"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [],
   "source": [
    "stringIndexer = StringIndexer(inputCol=\"title\", outputCol=\"title_num\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = stringIndexer.fit(df_details) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "indexed = model.transform(df_details) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+--------------------+--------------------+------+---------+\n",
      "|userId|movieId|               title|              genres|rating|title_num|\n",
      "+------+-------+--------------------+--------------------+------+---------+\n",
      "|     1|      1|    Toy Story (1995)|Adventure|Animati...|   4.0|     11.0|\n",
      "|     1|      3|Grumpier Old Men ...|      Comedy|Romance|   4.0|    422.0|\n",
      "|     1|      6|         Heat (1995)|Action|Crime|Thri...|   4.0|    129.0|\n",
      "|     1|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|   5.0|     15.0|\n",
      "|     1|     50|Usual Suspects, T...|Crime|Mystery|Thr...|   5.0|     13.0|\n",
      "|     1|     70|From Dusk Till Da...|Action|Comedy|Hor...|   3.0|    385.0|\n",
      "|     1|    101|Bottle Rocket (1996)|Adventure|Comedy|...|   5.0|   1130.0|\n",
      "|     1|    110|   Braveheart (1995)|    Action|Drama|War|   4.0|      7.0|\n",
      "|     1|    151|      Rob Roy (1995)|Action|Drama|Roma...|   5.0|    533.0|\n",
      "|     1|    157|Canadian Bacon (1...|          Comedy|War|   5.0|   2053.0|\n",
      "+------+-------+--------------------+--------------------+------+---------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "indexed.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+\n",
      "|title_num|count|\n",
      "+---------+-----+\n",
      "|0.0      |329  |\n",
      "|1.0      |317  |\n",
      "|2.0      |307  |\n",
      "+---------+-----+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "indexed.groupBy('title_num').count().orderBy('count',ascending=False).show(3,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "train,test=indexed.randomSplit([0.7,0.3])  #将训练和测试数据集以7比3划分"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.recommendation import ALS  #导入推荐系统中的ALS算法"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "rec=ALS(maxIter=10,regParam=0.01,userCol='userId',itemCol='title_num',ratingCol='rating',nonnegative=True,coldStartStrategy=\"drop\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [],
   "source": [
    "rec_model=rec.fit(train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [],
   "source": [
    "predicted_ratings=rec_model.transform(test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- userId: integer (nullable = true)\n",
      " |-- movieId: integer (nullable = true)\n",
      " |-- title: string (nullable = true)\n",
      " |-- genres: string (nullable = true)\n",
      " |-- rating: double (nullable = true)\n",
      " |-- title_num: double (nullable = false)\n",
      " |-- prediction: float (nullable = false)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "predicted_ratings.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-------+---------------------------+-------------------------------------+------+---------+----------+\n",
      "|userId|movieId|title                      |genres                               |rating|title_num|prediction|\n",
      "+------+-------+---------------------------+-------------------------------------+------+---------+----------+\n",
      "|305   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |2.8269906 |\n",
      "|20    |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |3.205861  |\n",
      "|169   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |3.795758  |\n",
      "|430   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.5   |148.0    |4.0312037 |\n",
      "|64    |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |2.7575402 |\n",
      "|590   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |2.8200083 |\n",
      "|381   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |3.5549805 |\n",
      "|610   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |3.1632247 |\n",
      "|7     |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|4.0   |148.0    |1.8177614 |\n",
      "|247   |6365   |Matrix Reloaded, The (2003)|Action|Adventure|Sci-Fi|Thriller|IMAX|3.0   |148.0    |3.2867253 |\n",
      "+------+-------+---------------------------+-------------------------------------+------+---------+----------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "predicted_ratings.show(10,truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import RegressionEvaluator        # RegressionEvaluator 回归评估器，它期望两个输入列:预测和标签。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [],
   "source": [
    "evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [],
   "source": [
    "rmse=evaluator.evaluate(predicted_ratings)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "标准误差：1.04349573121\n"
     ]
    }
   ],
   "source": [
    "print('{}{}'.format(\"标准误差：\",rmse))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [],
   "source": [
    "unique_movies=indexed.select('title_num').distinct()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "9719"
      ]
     },
     "execution_count": 53,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "unique_movies.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {},
   "outputs": [],
   "source": [
    "all_movie = unique_movies.alias('all')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {},
   "outputs": [],
   "source": [
    "watched_movies=indexed.filter(indexed['userId'] == 46).select('title_num').distinct()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "42"
      ]
     },
     "execution_count": 56,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "watched_movies.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "metadata": {},
   "outputs": [],
   "source": [
    "no_46=watched_movies.alias('no_46')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {},
   "outputs": [],
   "source": [
    "total_movies = all_movie.join(no_46, all_movie.title_num == no_46.title_num,how='left')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+\n",
      "|title_num|title_num|\n",
      "+---------+---------+\n",
      "|299.0    |null     |\n",
      "|305.0    |305.0    |\n",
      "|496.0    |null     |\n",
      "|558.0    |null     |\n",
      "|596.0    |null     |\n",
      "|692.0    |null     |\n",
      "|769.0    |null     |\n",
      "|934.0    |null     |\n",
      "|1051.0   |null     |\n",
      "|1761.0   |null     |\n",
      "+---------+---------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "total_movies.show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 66,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *\n",
    "remaining_movies=total_movies.where(col(\"no_46.title_num\").isNull()).select(all_movie.title_num).distinct() "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 67,
   "metadata": {},
   "outputs": [],
   "source": [
    "remaining_movies=remaining_movies.withColumn(\"userId\",lit(46))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "metadata": {},
   "outputs": [],
   "source": [
    "recommendations=rec_model.transform(remaining_movies).orderBy('prediction',ascending=False)\t"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "recommendations.show(100,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
